/*
 * Copyright (2021) The Delta Lake Project Authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.sql.delta

import java.io.File

import org.apache.spark.sql.delta.actions.Protocol
import org.apache.commons.io.FileUtils

import org.apache.spark.sql.AnalysisException

/**
 * We store the generation expressions in column's metadata. As Spark will propagate column metadata
 * to downstream operations when reading a table, old versions may create tables that have
 * generation expressions with an old writer version. For such tables, this test suite will verify
 * it behaves as a normal table. In other words, the generation expressions should be ignored in
 * new versions that understand generated columns so that all versions will have the same behaviors.
 */
class GeneratedColumnCompatibilitySuite extends GeneratedColumnTest {
  import GeneratedColumn._
  import testImplicits._

  /**
   * This test uses a special table generated by the following steps:
   *
   * 1. Run the following command using DBR 8.1 to generate a generated column table.
   *
   * ```
   * spark.sql("""CREATE TABLE generated_columns_table(
   *             |c1 INT,
   *             |c2 INT GENERATED ALWAYS AS ( c1 + 1 )
   *             |) USING DELTA
   *             |LOCATION 'sql/core/src/test/resources/delta/dbr_8_1_generated_columns'
   *             |""".stripMargin)
   * ```
   *
   * 2. Run the following command using DBR 8.0 to read the above table and create a new one.
   *
   * ```
   * spark.sql("""CREATE TABLE delta_non_generated_columns
   *             |USING DELTA
   *             |LOCATION 'sql/core/src/test/resources/delta/dbr_8_0_non_generated_columns'
   *             |AS SELECT * FROM
   *             |delta.`sql/core/src/test/resources/delta/dbr_8_1_generated_columns`
   *             |""".stripMargin)
   * ```
   *
   * Now the schema of `dbr_8_0_non_generated_columns` will contain generation expressions but it
   * has an old writer version. This test will verify this test is treated as a non generated column
   * table, which means new versions will have the exact behaviors as the old versions when reading
   * or writing this table.
   */
  def withDBR8_0Table(func: String => Unit): Unit = {
    val resourcePath = "src/test/resources/delta/dbr_8_0_non_generated_columns"
    withTempDir { tempDir =>
      // Prepare a table that has the old writer version and generation expressions
      FileUtils.copyDirectory(new File(resourcePath), tempDir)
      val path = tempDir.getCanonicalPath
      val deltaLog = DeltaLog.forTable(spark, path)
      // Verify the test table has the old writer version and generation expressions
      assert(hasGeneratedColumns(deltaLog.snapshot.metadata.schema))
      assert(!enforcesGeneratedColumns(deltaLog.snapshot.protocol, deltaLog.snapshot.metadata))
      func(path)
    }
  }

  test("dbr 8_0") {
    withDBR8_0Table { path =>
      withTempDir { normalTableDir =>
        // Prepare a normal table
        val normalTablePath = normalTableDir.getCanonicalPath
        spark.sql(
          s"""CREATE TABLE generated_columns_table(
             |c1 INT,
             |c2 INT
             |) USING DELTA
             |LOCATION '$normalTablePath'
             |""".stripMargin)

        // Now we are going to verify commands on `path` and `normalTablePath` should be the same.

        // Update `path` and `normalTablePath` using the same func and verify they have the
        // same result
        def updateTableAndCheckAnswer(func: String => Unit): Unit = {
          func(path)
          func(normalTablePath)
          checkAnswer(
            spark.read.format("delta").load(path),
            spark.read.format("delta").load(normalTablePath)
          )
        }

        def updateTableAndAssertMissingColumn(func: String => Unit): Unit = {
          val e = intercept[AnalysisException] {
            func(path)
          }
          assert(e.getMessage.contains("Column c2 is not specified in INSERT"))
          val e2 = intercept[AnalysisException] {
            func(normalTablePath)
          }
          assert(e2.getMessage.contains("Column c2 is not specified in INSERT"))
        }


        // Insert values that violate the generation expression should be okay because the table
        // should not be treated as a generated column table.
        updateTableAndCheckAnswer { tablePath =>
          sql(s"INSERT INTO delta.`$tablePath`VALUES(1, 10)")
        }
        updateTableAndCheckAnswer { tablePath =>
          sql(s"INSERT INTO delta.`$tablePath`(c2, c1) VALUES(11, 1)")
        }
        updateTableAndCheckAnswer { tablePath =>
          sql(s"INSERT OVERWRITE delta.`$tablePath`VALUES(1, 13)")
        }
        updateTableAndCheckAnswer { tablePath =>
          sql(s"INSERT OVERWRITE delta.`$tablePath`(c2, c1) VALUES(14, 1)")
        }
        updateTableAndCheckAnswer { tablePath =>
          // Append (1, null) to the table
          Seq(1).toDF("c1").write.format("delta").mode("append").save(tablePath)
        }
        updateTableAndCheckAnswer { tablePath =>
          Seq(1 -> 15).toDF("c1", "c2").write.format("delta").mode("append").save(tablePath)
        }
        updateTableAndCheckAnswer { tablePath =>
          // Overwrite the table with (2, null)
          Seq(2).toDF("c1").write.format("delta").mode("overwrite").save(tablePath)
        }
      }
    }
  }

  test("adding a new column should not enable generated columns") {
    withDBR8_0Table { path =>
      val deltaLog = DeltaLog.forTable(spark, path)
      val protocolBeforeUpdate = deltaLog.snapshot.protocol
      sql(s"ALTER TABLE delta.`$path` ADD COLUMNS (c3 INT)")
      deltaLog.update()
      // The generation expressions should be dropped
      assert(!hasGeneratedColumns(deltaLog.snapshot.metadata.schema))
      assert(deltaLog.snapshot.protocol == protocolBeforeUpdate)
      assert(!enforcesGeneratedColumns(deltaLog.snapshot.protocol, deltaLog.snapshot.metadata))
    }
  }

  test("specifying a min writer version should not enable generated column") {
    withDBR8_0Table { path =>
      val deltaLog = DeltaLog.forTable(spark, path)
      sql(s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.minWriterVersion'='4')")
      deltaLog.update()
      // The generation expressions should be dropped
      assert(!hasGeneratedColumns(deltaLog.snapshot.metadata.schema))
      assert(deltaLog.snapshot.protocol == Protocol(1, 4))
      assert(!enforcesGeneratedColumns(deltaLog.snapshot.protocol, deltaLog.snapshot.metadata))
    }
  }
}
